create function SplitFunction AS 'com.bj58.zpdata.bigdata.flink.realtime.warehouse.udf.SplitUDTF';
create function splitindex AS 'com.bj58.zpdata.bigdata.flink.realtime.warehouse.udf.SplitIndexUDF';

CREATE TABLE hdp_lbg_zhaopin_ods_zp_ganji_zzdetail_visitor (
    detail_msg string
)
with (
'connector' = 'kafka',
'topic' = ' hdp_lbg_supin_ods_zp_ganji_zzdetail',
'properties.bootstrap.servers' = 'bh-kafka3-1.58dns.org:9092,bh-kafka3-2.58dns.org:9092,bh-kafka3-3.58dns.org:9092,bh-kafka3-4.58dns.org:9092,bh-kafka3-5.58dns.org:9092',
'properties.group.id' = 'hdp_lbg_zhaopin-hdp_lbg_supin_ods_zp_ganji_zzdetail',
'properties.client.id' = 'hdp_lbg_zhaopin-hdp_lbg_supin_ods_zp_ganji_zzdetail-RsuDe',
'scan.startup.mode' = 'group-offsets',
'sink.parallelism' = '16',
'format' = 'raw'
);

CREATE TABLE hdp_lbg_zhaopin_ods_zp_all_zzdetail_visitor (
  abtest string,
  buid bigint,
  cuid bigint,
  infoid bigint,
  sourcetype int,
  infoparams string,
  addtime bigint,
  tjfrom string,
  infodistype string,
  pid string,
  url string
)
with (
'connector' = 'kafka',
'topic' = 'hdp_lbg_zhaopin_ods_zp_all_zzdetail_visitor',
'properties.bootstrap.servers' = 'bh-kafka3-1.58dns.org:9092,bh-kafka3-2.58dns.org:9092,bh-kafka3-3.58dns.org:9092,bh-kafka3-4.58dns.org:9092,bh-kafka3-5.58dns.org:9092',
'properties.client.id' = 'hdp_lbg_zhaopin-hdp_lbg_zhaopin_ods_zp_all_zzdetail_visitor-zvy70',
'scan.startup.mode' = 'group-offsets',
'sink.parallelism' = '24',
'format' = 'json'
);

insert into hdp_lbg_zhaopin_ods_zp_all_zzdetail_visitor
select
    'xuhualei_ganji_2024011017' as abtest,
    cast(coalesce(splitindex(msg,'\0001',38),'-999999') as int) as buid,
    cast(if(splitindex(msg,'\0001',9) is null,'-999999',splitindex(msg,'\0001',9)) as INT) as cuid,
    cast(if(splitindex(msg,'\0001',37) is null,'-999999',splitindex(msg,'\0001',37)) as INT) as infoid,
    2 as sourcetype,
    if(splitindex(msg,'\0001',46) is null,'-',splitindex(msg,'\0001',46) ) as infoparams,
    cast(if(splitindex(msg,'\0001',28) is null,'-999999',splitindex(msg,'\0001',28)) as INT) as addtime,
    if(splitindex(msg,'\0001',34) is null,'-',splitindex(msg,'\0001',34)) as tjfrom,
    if(splitindex(msg,'\0001',45) is null,'-',splitindex(msg,'\0001',45)) as infodistype,
    if(splitindex(msg,'\0001',4) is null,'-',splitindex(msg,'\0001',4)) as pid,
    splitindex(msg,'\0001',0) as url
from (
  select msg
  from hdp_lbg_zhaopin_ods_zp_ganji_zzdetail_visitor
  , LATERAL TABLE(SplitFunction(detail_msg,'\n')) as t(msg)
)aa
;
